package com.atguigu.flink.sql.definetime;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

/**
 * Created by Smexy on 2023/11/20
 */
public class Demo1_TableAPIDefineTime
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //基于流的环境创建表的环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

         WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                     .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                     .withTimestampAssigner( (e, ts) -> e.getTs());

        SingleOutputStreamOperator<WaterSensor> ds = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(watermarkStrategy);

        Schema schema = Schema
            .newBuilder()
            //定义一个POJO的属性作为列
            .column("ts", "BIGINT")
            .column("id", "STRING")
            .column("vc", "INT")
            //基于一个表达式，生成一列。表达式可以是列的引用，可以是调用函数，或常量
            .columnByExpression("vc+10", "vc + 10")
            //生成处理时间
            .columnByExpression("pt", "PROCTIME()")
            //定义事件时间。由于事件时间必须是时间类型，需要把bigint类型的时间转换
            .columnByExpression("a", "TO_TIMESTAMP_LTZ(ts,3)")
            //定义水印  watermark(String 事件时间列的名字, Expression 水印如何生成)
            //.watermark("a","a - INTERVAL '0.001' SECOND")
            //如果流中自带水印，可以直接引用流中的水印
            //.watermark("a",Expressions.sourceWatermark())
            .watermark("a", "SOURCE_WATERMARK()")
            .build();
        /*
            Table fromDataStream(DataStream<T> var1, Schema var2)
                    基于流，将流转换为表，表的Schema(结构，元数据)参考var2
         */
        Table table = tableEnv.fromDataStream(ds,schema);

        /*
            `pt` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME*, 带了PROCTIME，说明这一列是处理时间
            `a` TIMESTAMP_LTZ(3) *ROWTIME* ,带了ROWTIME，说明这一列是事件时间。
                水印不属于表的列，看不到！
         */
        table.printSchema();

        table.execute().print();

    }
}
